feat: streaming column-major single-RG merge engine (PR-6b)#6409
Draft
g-talbot wants to merge 2 commits intogtt/parquet-page-decoderfrom
Draft
feat: streaming column-major single-RG merge engine (PR-6b)#6409g-talbot wants to merge 2 commits intogtt/parquet-page-decoderfrom
g-talbot wants to merge 2 commits intogtt/parquet-page-decoderfrom
Conversation
Adds `merge::streaming::streaming_merge_sorted_parquet_files`, an async N-input → M-output merge that consumes inputs as `Box<dyn ColumnPageStream>` (PR-5a's trait) and writes each output column-by-column via `StreamingParquetWriter` (PR-2). Each output is single row group; multi-RG output at metric_name boundaries lands in PR-6c. Compared to the existing whole-file engine, the win is on the output side: the standard `ArrowWriter` materialises a full row-group worth of column-chunk buffers before serialising, whereas this writer flushes one column chunk at a time, so output peak memory is bounded by the largest single column chunk plus bookkeeping (page index, bloom filters), not by the total row group. Inputs are drained one row group at a time via PR-6a's `StreamDecoder`, then concatenated per input. Per-RG decode memory is bounded; the per-input concat matches the existing engine's input shape for the merge planner. Truly per-RG streaming inputs (one input RG at a time across all inputs) lands when prefix=1 multi-RG inputs become the dominant compaction path — PR-6b is correct without it. Reuses the existing permutation, KV metadata, sorting columns, MC-3 sort-order check (`super::writer`), and union schema / output optimisation (`super::schema`) helpers — they're now `pub(super)` so both the existing whole-file engine and the new streaming engine can share them. PR-7 will fold the non-streaming path away. Tests (10, all passing): two-input simple merge, single-RG output contract, total row count preservation (MC-1), sort-schema mismatch rejected, window mismatch rejected, output has page-level statistics, KV metadata propagated (with `qh.num_merge_ops` incremented), all- empty inputs produce no output, one empty among non-empty handled, output drainable through `StreamDecoder` round-trip. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Newer nightly rustfmt's `wrap_comments` reflows the module-level doc comments tighter than my local nightly at original push time. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
6226032 to
add52f0
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
merge::streaming::streaming_merge_sorted_parquet_files: async N-input → M-output merge consuming inputs asBox<dyn ColumnPageStream>and writing each output column-by-column viaStreamingParquetWriter(single row group per output).StreamDecoder, then concatenated per input. Per-RG decode memory is bounded; per-input concat matches the existing engine's input shape for the merge planner.super::writer) and union schema / output-optimisation helpers (super::schema) — nowpub(super). PR-7 folds the non-streaming path away.Why the win is on the output side
The standard
ArrowWritermaterialises a full row group worth of column-chunk buffers before serialising. PR-2'sStreamingParquetWriterflushes one column chunk at a time, so output peak memory is bounded by the largest single column chunk plus bookkeeping (page index, bloom filters), not by the row group total.The input side stays at "drain whole input into a
RecordBatchper input" because per-RG streaming through the merge driver requires synchronising RG boundaries across inputs, which only works once prefix=1 multi-RG inputs are the dominant compaction case. PR-6b is correct without it; the input-side streaming optimisation is a follow-up.Stack
Test plan
test_two_inputs_simple_merge— total row count + ascending sorted_seriestest_output_is_single_row_group— PR-6b's contract: every output is exactly 1 RGtest_total_rows_preserved— MC-1 invarianttest_sort_schema_mismatch_rejected— input metadata validationtest_window_start_mismatch_rejected— sametest_output_has_page_index_metadata— column index + offset index present (query pruning)test_kv_metadata_propagated_to_output—qh.sort_fields,qh.window_start,qh.window_durationcarry through;qh.num_merge_opsincrementedtest_all_empty_inputs_no_outputtest_one_empty_input_among_nonemptytest_output_drainable_by_stream_decoder— round-trip output →StreamDecoder→ row countCI gates locally green: clippy
--workspace --all-features --testswith-Dwarnings, nightlyfmt --check,cargo doc --no-deps,cargo machete, license headers, log format, typos. 411/411 crate tests pass.🤖 Generated with Claude Code